Шрифт:
Интервал:
Закладка:
Stateful операции, distinct, sorted, limit, skip, должны учитывать состояние из ранее обработанных элементов при обработке новых элементов.
Для вычисления результата с учетом состояния требуется обработка всего ввода.
Например, нельзя отсортировать поток до тех пор, пока не будут видны все элементы потока.
В результате при параллельном вычислении, стримы, содержащие промежуточные операции с промежуточным состоянием, требуют нескольких проходов обработки данные или требуют буферизации данных.
В отличие от этого, стримы, содержащие промежуточные операции без состояния, могут обрабатываться за один проход.
Поэтому stateful операции замедляют выполнение параллельных стримов.
Также сами параметры или лямбда выражения стрим операций могут быть stateless и stateful.
Лямбда stateful выражение — это выражение, результат которого зависит от любого состояния, которое может измениться при выполнении стрима.
Здесь мы вычисляем сумму целых чисел, отбрасывая дублирующие элементы, так как метод add класса HashSet добавляет только те элементы, которых еще нет в наборе.
То есть здесь мы используем лямбда выражение, которое должно учитывать состояние ранее обработанных элементов при обработке новых элементов.
И здесь мы не используем операцию distinct, которая автоматически учитывает все элементы стрима.
Соответственно, здесь элементы будут разбиты на группы, каждая из которых будет обрабатываться в своем потоке операцией add.
В каждой группе операция add отбросит дубликаты и все группы просуммируются.
Но так как дубликаты могут быть в соседних группах и при каждом таком выполнении элементы будут разбиваться на группы по-разному, результат вычисления будет разным.
Поэтому при вычислении параллельных стримов нельзя использовать stateful лямбда выражение, так как мы каждый раз будем получать разные результаты для одного и того же ввода.
Теперь, лямбда выражения операций параллельных стримов не должны давать сторонний эффект, то есть они не должны модифицировать другие данные программы или не должны изменяться другими элементами программы.
Здесь мы в параллельных потоках пытаемся модифицировать ArrayList.
При модификации ArrayList один поток увеличивает низлежащий массив и пытается скопировать туда данные.
В это время другой поток успевает туда скопировать свои данные.
Тогда при копировании данных первым потоком возникает исключение ArrayIndexOutOfBoundsException.
Если же мы здесь синхронизируем ArrayList, мы потеряем все преимущество параллелизма.
Также, лямбда-выражения в стрим операциях не должны интерферировать.
Интерференция возникает, когда источник стрима изменяется, при обработке стрима.
Эта ситуация похожа на сторонний эффект, только здесь модифицируется не внешний источник, а источник стрима.
Стримы позволяют выполнять параллельные операции над различными источниками данных, включая даже потоко небезопасные коллекции, такие как ArrayList.
Это возможно только в том случае, если не будет интерференции с источником данных во время выполнения конвейера.
Это верно также и для последовательных стримов.
Так как, если мы изменяем источник в промежуточной операции, она не выполняется, пока не будет вызвана терминальная операция.
Когда вызывается терминальная операция, она рассматривает не модифицированный источник.
Далее вызывается промежуточная операция, которая пытается этот источник модифицировать, и возникает исключение ConcurrentModificationException.
Таким образом, при использовании параллельных стримов, требуется тестирование и анализ алгоритма выполнения.
Реактивные потоки
Обработка данных эволюционировала от пакетных архитектур, которые собирают данные и впоследствии обрабатывают данные после достижения определенного порога, до поточно-ориентированных архитектур, которые захватывают и обрабатывают данные в реальном времени и очень быстро изменяют системы на основе обработанных результатов.
Напротив, для пакетной обработки может потребоваться гораздо больше времени.
Обработка потоков данных, особенно «живых» данных, объем которых не предопределен, требует особой осторожности в асинхронной системе.
Основная проблема заключается в том, что потребление ресурсов необходимо контролировать, чтобы быстрый источник данных не перегрузил адресат потока.
Асинхронность необходима для параллельного использования вычислительных ресурсов, что может значительно ускорить обработку данных.
Реактивные потоки Java обеспечивают стандарт для асинхронной обработки потока с неблокирующим обратным давлением.
Реактивный поток обеспечивает способ просигнализировать его источнику, чтобы ослабить производство данных, когда получатель потока становится перегруженным этими данными.
Эта сигнализация похожа на клапан на водопроводной трубе.
Закрытие этого клапана увеличивает обратное давление на источник, уменьшая нагрузку на получателя.
Целью этого фреймворка является управление обменом потоковыми данными на асинхронной границе, например, при передаче данных в другой поток, гарантируя, что получатель не будет вынужден буферизовать произвольные объемы данных.
Другими словами, обратное давление позволяет очередям, которые посредничают между потоками, быть ограниченными.
Java 9 обеспечивает реактивные потоки с помощью фреймворка публикации-подписки, также известного как Flow API, который состоит из классов Flow и SubmissionPublisher пакета java.util.concurrent.
Таким образом, Java 9 предоставляет механизм публикации-подписки, когда подписчик информирует издателя, что он готов принять заданное количество элементов, и, если эти элементы доступны, издатель выталкивает это количество элементов получателю.
Важно отметить, что эта связь является двусторонней, когда подписчик информирует издателя о том, сколько элементов он хочет обработать, а издатель выталкивает это количество элементов подписчику.
Это двухстороннее соединение между издателем и подписчиком называется подпиской.
И эта подписка связывает одного издателя с одним подписчиком, в отношении «один к одному», и может быть одноадресной или многоадресной.
То есть, у одного издателя может быть несколько подписчиков, подписанных на него, но один подписчик может быть подписан только на одного издателя, то есть у издателя может быть много подписчиков, но подписчик может подписаться не более чем на одного издателя.
Когда подписчик подписывается на издателя, издатель уведомляет подписчика о подписке, позволяя подписчику сохранять ссылку на подписку при желании.
Как только этот процесс уведомления будет завершен, подписчик может сообщить издателю, что он готов принять некоторое количество элементов.
Когда у издателя есть доступные элементы, он отправляет подписчику не более заданного количества элементов.
Если у издателя возникает ошибка, он сигнализирует об ошибке подписчику.
Если издатель перестает отправлять данные, он сигнализирует об этом подписчику.
Если подписчик уведомляется о том, что либо, произошла ошибка, либо издатель перестает отправлять данные, подписка считается отмененной и не взаимодействие между издателем и подписчиком завершается.
Если объект является как издателем, так и подписчиком, он называется процессором.
Процессор обычно выступает в качестве посредника между другим издателем и подписчиком (любой из которых может быть другим процессором), выполняя некоторое преобразование в потоке данных.
Например, может быть создан процессор, который отфильтровывает элементы, которые соответствуют некоторым критериям, прежде чем передавать их своему подписчику.
Класс Flow — это хранилище для четырех вложенных статических интерфейсов, методы которых устанавливают контролируемые потоком компоненты, в